package util

import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.rdd.RDD

/**
  * Created by chenjianwen on 2016/3/7.
  */
object RDDOperation {
  //相加行为评分和订单评分
  def addActAndOrderScope(oneRdd:RDD[(Int,Rating)],orderRdd:RDD[(Int,Rating)]): RDD[(Int,Rating)] ={
    def transToKeyPro  = (x:(Int,Rating))=> {
      val rating = x._2
      val timeRange = x._1
      ((rating.user,rating.product),(timeRange,rating))
    }

    val oneMapRes = oneRdd.map(transToKeyPro)
    val orderMapRes = orderRdd.map(transToKeyPro)
    val middleRes = oneMapRes.fullOuterJoin(orderMapRes)

    println(
      s"""
         one:-----  ${oneMapRes.count()}
         order:---- ${orderMapRes.count()}
         middleRes:----${middleRes.count()}
       """.stripMargin)




    //不要用join操作把(userId,productId)为key相同的元素合并,join是内连接
    val addRes = oneRdd.map(transToKeyPro).fullOuterJoin(orderRdd.map(transToKeyPro)).map( x=>{
      x match{
        case ((userId,productId),(o1,o2))=>{
          var time:Int = 0
          var score:Double = 0
          if(o1 == None){
            time = o2.get._1
          }else{
            time = o1.get._1
          }
          if(o1 != None){score = score + o1.get._2.rating}
          if(o2 != None) {score = score + o2.get._2.rating}
          (time,new Rating(userId,productId,score))
        }
      }
      })
    addRes
  }
}
